-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ResidualVisitor to compute residuals #1388
base: main
Are you sure you want to change the base?
Conversation
…ta-only-op add count in data scan and test in catalog sql
@@ -1493,6 +1496,13 @@ def to_ray(self) -> ray.data.dataset.Dataset: | |||
|
|||
return ray.data.from_arrow(self.to_arrow()) | |||
|
|||
def count(self) -> int: | |||
res = 0 | |||
tasks = self.plan_files() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this count will not be accurate when there are deletes files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kevinjqliu thank you for the review. I am trying to account for positional deletes, do you have a suggestion on how that can be achieved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this can be widely off, not just because the merge-on-read deletes, but because plan_files
returns all the files that (might) contain relevant rows. For example, if it cannot be determined if has relevant data, it will be returned by plan_files
.
I think there are two ways forward:
- One is similar to how we handle deletes. For deletes, we check if the whole file matches, if this is the case, then we can simply drop the file from the metadata. You can find the code here. If a file fully matches, is is valid to use
task.file.record_count
. We would need to extend this to also see if there are also merge-on-read deletes as Kevin already mentioned, or just fail when there are positional deletes. - A cleaner option, but this is a bit more work, but pretty exciting, would be to include the
residual-predicate
in theFileScanTask
. When we run a query, likeday_added = 2024-12-01 and user_id = 10
, then theday_added = 2024-12-01
might be satisfied with the partitioning already. This is the case when the table is partitioned by day, and we know that all the data in the file evaluatestrue
forday_added = 2024-12-01
, then we need to open the file, and filter foruser_id = 10
. If we would leave out theuser_id = 10
, then it would beALWAYS_TRUE
, and then we know that we can just usetask.file.record_count
. This way we could very easily loop over the.plan_files()
:
def count(self) -> int:
res = 0
tasks = self.plan_files()
for task in tasks:
if task.residual == ALWAYS_TRUE and len(task.delete_files):
res += task.file.record_count
else:
# pseudocode, open the table, and apply the filter and deletes
res += len(_table_from_scan_task(task))
return res
To get to the second step, we first have to port the ResidualEvaluator
. The java code can be found here, including some excellent tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Fokko I have added Residual Evaluator with Tests.
Now I am trying to create the breaking tests for count where delete has occurred and the counts should differ
@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_delete_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.table_partitioned_delete"
run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number_partitioned int,
number int
)
USING iceberg
PARTITIONED BY (number_partitioned)
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
f"""
INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40)
""",
# Generate a positional delete
f"""
DELETE FROM {identifier} WHERE number = 30
""",
],
)
tbl = session_catalog.load_table(identifier)
# Assert that there is just a single Parquet file, that has one merge on read file
files = list(tbl.scan().plan_files())
assert len(files) == 1
assert len(files[0].delete_files) == 1
# Will rewrite a data file without the positional delete
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10], "number": [20, 40]}
assert tbl.scan().count() == 2
assert tbl.scan().count() == 1
tbl.delete(EqualTo("number", 40))
# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}
assert tbl.scan().count() == 1
Question: Does it make sense to expose this as the |
…-count Residual Evaluator with test
* added residual evaluator in plan files * tested counts with positional deletes * merged main
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit | ||
).to_table([task]) | ||
res += len(tbl) | ||
return res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this approach! My only concern is about loading too much data into memory at once, although this is loading just one file at a time, in the worst case some file could potentially be very large? Shall we define a threshold and check, for example, if file size < XXX
, load entire file, otherwise turn it into pa.RecordBatchReader
and read stream of record batches for counting.
target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches([task])
reader = pa.RecordBatchReader.from_batches(
target_schema,
batches,
)
count = 0
for batch in reader:
count += batch.num_rows
return count
https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L1541-L1564
* added residual evaluator in plan files * tested counts with positional deletes * merged main * implemented batch reader in count * breaking integration test * fixed integration test * git pull main * revert * revert * revert test_partitioning_key.py * revert test_parser.py * added residual evaluator in visitor * deleted residual_evaluator.py * removed test count from test_sql.py * ignored lint type * fixed lint * working on plan_files * type ignored * make lint
Hi @Fokko @kevinjqliu @gli-chris-hao , I have implemented these suggestions with my best understanding.
It would be helpful to get fresh review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great @tusharchou Thanks for working on this. I left some comments, but this is a great start 🚀
@@ -1731,3 +1731,214 @@ def _can_contain_nulls(self, field_id: int) -> bool: | |||
|
|||
def _can_contain_nans(self, field_id: int) -> bool: | |||
return (nan_count := self.nan_counts.get(field_id)) is not None and nan_count > 0 | |||
|
|||
|
|||
class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we copy this comment?
I think that would be helpful for the reader.
schema: Schema | ||
spec: PartitionSpec | ||
case_sensitive: bool | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add:
expr: BooleanExpression
to the class variables as well?
spec: PartitionSpec | ||
case_sensitive: bool | ||
|
||
def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression): | |
def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression) -> None: |
def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression: | ||
val = term.eval(self.struct) | ||
if val is None: | ||
return self.visit_true() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to Java, I think we can return AlwaysTrue
directly, instead of calling visit_true()
: https://github.com/apache/iceberg/blob/5fd16b5bfeb85e12b5a9ecb4e39504389d7b72ed/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java#L157
return self.visit_true() | |
return AlwaysTrue() |
val = term.eval(self.struct) | ||
if val is not None: | ||
return self.visit_true() | ||
else: | ||
return self.visit_false() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java takes a different approach and checks for NaN
:
val = term.eval(self.struct) | |
if val is not None: | |
return self.visit_true() | |
else: | |
return self.visit_false() | |
if isnan(term.eval(self.struct)): | |
return self.visit_true() | |
else: | |
return self.visit_false() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is causing a type casting problem as term.eval()
returns a L
with can be of typre str, bytes, UUID
with isnan doesn't support
pyiceberg/expressions/visitors.py:1799: error: Argument 1 to "isnan" has incompatible type "str"; expected "Union[SupportsFloat, SupportsIndex]" [arg-type]
pyiceberg/expressions/visitors.py:1799: error: Argument 1 to "isnan" has incompatible type "bytes"; expected "Union[SupportsFloat, SupportsIndex]" [arg-type]
pyiceberg/expressions/visitors.py:1799: error: Argument 1 to "isnan" has incompatible type "UUID"; expected "Union[SupportsFloat, SupportsIndex]" [arg-type]
pyiceberg/expressions/visitors.py:1805: error: Argument 1 to "isnan" has incompatible type "str"; expected "Union[SupportsFloat, SupportsIndex]" [arg-type]
pyiceberg/expressions/visitors.py:1805: error: Argument 1 to "isnan" has incompatible type "bytes"; expected "Union[SupportsFloat, SupportsIndex]" [arg-type]
pyiceberg/expressions/visitors.py:1805: error: Argument 1 to "isnan" has incompatible type "UUID"; expected "Union[SupportsFloat, SupportsIndex]" [arg-type]
@@ -1522,8 +1555,9 @@ def plan_files(self) -> Iterable[FileScanTask]: | |||
data_entry, | |||
positional_delete_entries, | |||
), | |||
residual=residual, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of carrying the residuals all the way through, I think we can compute them at this point instead:
residual=residual, | |
residual=residual_evaluators[manifest.partition_spec_id](data_entry.data_file.partition), |
If a whole manifest is rejected, we don't even compute the evaluator itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko as manifest isn't accessible there can we instead write -
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file.partition),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tusharchou I don't really think this one through. Without any additional changes, this would look like:
return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
),
)
for data_entry in data_entries
]
Where we re-create the evaluator all the time
# task.residual is a Boolean Expression if the filter condition is fully satisfied by the | ||
# partition value and task.delete_files represents that positional delete haven't been merged yet | ||
# hence those files have to read as a pyarrow table applying the filter and deletes | ||
if task.residual == AlwaysTrue() and not len(task.delete_files): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a bit more explicit:
if task.residual == AlwaysTrue() and not len(task.delete_files): | |
if task.residual == AlwaysTrue() and len(task.delete_files) == 0: |
if task.residual == AlwaysTrue() and not len(task.delete_files): | ||
# Every File has a metadata stat that stores the file record count | ||
res += task.file.record_count | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the positional deletes are missing from the else:
branch. How about re-using _task_to_record_batches
for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko I have added test of positional delete and it works. ArrowScan.to_record_batches
calls `_task_to_record_batches' internally. Can you please take another look.
projected_schema=self.projection(), | ||
row_filter=self.row_filter, | ||
case_sensitive=self.case_sensitive, | ||
limit=self.limit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will lead to incorrect results since we don't want to limit the count
limit=self.limit, | |
limit=self.limit, |
arrow_scan = ArrowScan( | ||
table_metadata=self.table_metadata, | ||
io=self.io, | ||
projected_schema=self.projection(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not interested in any fields at all, so we could just pass in an empty schema. This will also relax @gli-chris-hao his concern regarding memory pressure 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple question: What would an empty schema look like passing a 'Schema()` object throws error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was suggesting that indeed, what kind of error? Otherwise we can leave it as is for now and do it in a subsequent PR
closes issue: Count rows as a metadata-only operation #1223